package com.fwmagic.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;

public class Hbase2xDemo {

    private Connection connection;
    private Admin admin;

    private final String NAMESPACE = "test";

    private final String TABLENAME = "person";

    //TableName：（namespace,tablename）名称空间：表名
    private final TableName tableName = TableName.valueOf(NAMESPACE, TABLENAME);

    @Before
    public void init() throws Exception {
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", "hd1:2181,hd2:2181,hd3:2181");
        connection = ConnectionFactory.createConnection(config);
        admin = connection.getAdmin();
    }

    /**
     * 新建名称空间（namespace）
     *
     * @throws Exception
     */
    @Test
    public void createNameSpace() throws Exception {
        NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(NAMESPACE).build();
        admin.createNamespace(namespaceDescriptor);
        NamespaceDescriptor[] arr = admin.listNamespaceDescriptors();
        for (NamespaceDescriptor descriptor : arr) {
            System.out.println(descriptor.getName());
        }
    }

    /**
     * 新建表
     *
     * @throws Exception
     */
    @Test
    public void createTable() throws Exception {
        TableDescriptorBuilder tdBuilder = TableDescriptorBuilder
                .newBuilder(tableName);

        ColumnFamilyDescriptorBuilder cfdBuilder = ColumnFamilyDescriptorBuilder
                .newBuilder("base_info".getBytes());

        cfdBuilder.setMaxVersions(3);
        cfdBuilder.setBloomFilterType(BloomType.ROWCOL);

        ColumnFamilyDescriptorBuilder cfdBuilder2 = ColumnFamilyDescriptorBuilder
                .newBuilder("extra_info".getBytes());

        cfdBuilder2.setMaxVersions(3);
        cfdBuilder2.setBloomFilterType(BloomType.ROWCOL);


        ColumnFamilyDescriptor cfDescriptor = cfdBuilder.build();
        ColumnFamilyDescriptor cfDescriptor2 = cfdBuilder2.build();

        tdBuilder.setColumnFamilies(Arrays.asList(cfDescriptor, cfDescriptor2));

        TableDescriptor tableDescriptor = tdBuilder.build();

        admin.createTable(tableDescriptor);
    }

    /**
     * 新建表并做预分区
     *
     * @throws Exception
     */
    @Test
    public void createTableWithSplit() throws Exception {
        TableDescriptorBuilder tdBuilder = TableDescriptorBuilder
                .newBuilder(TableName.valueOf(NAMESPACE, "t_spilt"));

        ColumnFamilyDescriptorBuilder cfdBuilder = ColumnFamilyDescriptorBuilder
                .newBuilder("base_info".getBytes());

        cfdBuilder.setMaxVersions(3);
        cfdBuilder.setBloomFilterType(BloomType.ROWCOL);

        ColumnFamilyDescriptorBuilder cfdBuilder2 = ColumnFamilyDescriptorBuilder
                .newBuilder("extra_info".getBytes());

        cfdBuilder2.setMaxVersions(3);
        cfdBuilder2.setBloomFilterType(BloomType.ROWCOL);


        ColumnFamilyDescriptor cfDescriptor = cfdBuilder.build();
        ColumnFamilyDescriptor cfDescriptor2 = cfdBuilder2.build();

        tdBuilder.setColumnFamilies(Arrays.asList(cfDescriptor, cfDescriptor2));

        TableDescriptor tableDescriptor = tdBuilder.build();

        //预分区
        byte[][] splitKeys = {"a".getBytes(), "f".getBytes(), "k".getBytes(), "q".getBytes(), "v".getBytes()};

        admin.createTable(tableDescriptor, splitKeys);
    }


    /**
     * 清空表数据
     *
     * @throws Exception
     */
    @Test
    public void truncateTable() throws Exception {
        admin.disableTable(tableName);
        admin.truncateTable(tableName, true);
    }

    /**
     * 删除表
     *
     * @throws Exception
     */
    @Test
    public void dropTable() throws Exception {
        admin.disableTable(tableName);
        admin.deleteTable(tableName);
    }


    /**
     * 插入数据
     *
     * @throws Exception
     */
    @Test
    public void put() throws Exception {
        Table table = connection.getTable(tableName);
        Put put = new Put("r001".getBytes());
        put.addColumn("base_info".getBytes(), "id".getBytes(), Bytes.toBytes("1"));
        put.addColumn("base_info".getBytes(), "name".getBytes(), "zhangsan".getBytes());
        put.addColumn("base_info".getBytes(), "age".getBytes(), Bytes.toBytes("18"));
        put.addColumn("extra_info".getBytes(), "address".getBytes(), "sh".getBytes());
        table.put(put);
    }

    /**
     * 批量插入数据
     *
     * @throws Exception
     */
    @Test
    public void batchPut() throws Exception {
        Table table = connection.getTable(tableName);
        ArrayList<Put> puts = new ArrayList<Put>();
        for (int i = 1; i <= 100000; i++) {
            Put put = new Put(Bytes.toBytes("r" + i));
            put.addColumn("base_info".getBytes(), "id".getBytes(), Bytes.toBytes(i + ""));
            put.addColumn("base_info".getBytes(), "name".getBytes(), Bytes.toBytes("zhangsan" + i));
            put.addColumn("base_info".getBytes(), "age".getBytes(), Bytes.toBytes(i + ""));
            put.addColumn("extra_info".getBytes(), "address".getBytes(), ("sh" + i).getBytes());
            puts.add(put);
            //防止一次性插入数据量太大，可以分批次插入，每100条插入一次
            if (puts.size() % 100 == 0) {
                table.put(puts);
                puts.clear();
            }
        }
        table.put(puts);
    }


    /**
     * 插入数据
     *
     * @throws Exception
     */
    @Test
    public void mutate() throws Exception {
        BufferedMutator bufferedMutator = connection.getBufferedMutator(tableName);
        Put put = new Put("r001".getBytes());
        put.addColumn("base_info".getBytes(), "id".getBytes(), Bytes.toBytes("1"));
        put.addColumn("base_info".getBytes(), "name".getBytes(), "zhangsan".getBytes());
        put.addColumn("base_info".getBytes(), "age".getBytes(), Bytes.toBytes("18"));
        put.addColumn("extra_info".getBytes(), "address".getBytes(), "sh".getBytes());
        bufferedMutator.mutate(put);
        bufferedMutator.close();
    }

    /**
     * 原子性操作一系列动作
     *
     * @throws Exception
     */
    @Test
    public void rowMutations() throws Exception {

        Table table = connection.getTable(TableName.valueOf("tt"));
        //删除一列
        Delete delete = new Delete("r001".getBytes());
        delete.addColumn("f1".getBytes(), "age".getBytes());
        //修改一列
        Put edit = new Put("r001".getBytes());
        edit.addColumn("f1".getBytes(), "name".getBytes(), "lisi".getBytes());

        //新增一列
        Put put = new Put("r001".getBytes());
        put.addColumn("f1".getBytes(), "address".getBytes(), "beijing".getBytes());

        //新建RowMutation类并把以上操作添加进去
        RowMutations rowMutations = new RowMutations("r001".getBytes());
        rowMutations.add(delete);
        rowMutations.add(edit);
        rowMutations.add(put);
        table.mutateRow(rowMutations);

    }


    /**
     * 批量插入数据
     * 客户端如果有大量数据需要集中密集写入hbase表
     * 直接用put方法的话，客户端程序就需要等待put全部完成--put是一个同步操作
     * 而BufferedMutator则允许客户端设置一个缓冲区，提交的数据先放到缓冲区，后面会异步提交到hbase集群
     *
     * @throws Exception
     */
    @Test
    public void batchMutate() throws Exception {
        BufferedMutator bufferedMutator = connection.getBufferedMutator(tableName);
        for (int i = 1; i <= 100000; i++) {
            Put put = new Put(Bytes.toBytes("r" + i));
            put.addColumn("base_info".getBytes(), "id".getBytes(), Bytes.toBytes(i + ""));
            put.addColumn("base_info".getBytes(), "name".getBytes(), Bytes.toBytes("lisi" + i));
            put.addColumn("base_info".getBytes(), "age".getBytes(), Bytes.toBytes(i + ""));
            put.addColumn("extra_info".getBytes(), "address".getBytes(), ("sh" + i).getBytes());
            bufferedMutator.mutate(put);
        }
        bufferedMutator.close();
    }


    /**
     * 根据RowKey获取数据
     *
     * @throws Exception
     */
    @Test
    public void get() throws Exception {
        Table table = connection.getTable(tableName);
        Get get = new Get("r1".getBytes());
        Result result = table.get(get);
        //String str = new String(result.getValue("base_info".getBytes(), "name".getBytes()));
        while (result.advance()) {
            Cell cell = result.current();
            printCell(cell);
        }
    }

    /**
     * 打印
     *
     * @param cell
     */
    private void printCell(Cell cell) {
        String r = new String(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
        String f = new String(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
        String q = new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
        String v = new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
        String data = String.format("r=%s,f=%s,q=%s,v=%s", r, f, q, v);
        System.out.println(data);
    }

    private void printCellUtil(Cell cell) {
        String r = new String(CellUtil.cloneRow(cell));
        String f = new String(CellUtil.cloneFamily(cell));
        String q = new String(CellUtil.cloneQualifier(cell));
        String v = new String(CellUtil.cloneValue(cell));
        String data = String.format("r=%s,f=%s,q=%s,v=%s", r, f, q, v);
        System.out.println(data);
    }

    /**
     * 根据scan获取数据
     *
     * @throws Exception
     */
    @Test
    public void scan() throws Exception {
        Table table = connection.getTable(tableName);
        Scan scan = new Scan();
        //设置起始rowkey
        scan.withStartRow("r1".getBytes());
        //设置结束rowkey
        scan.withStopRow("r1006".getBytes());

        //设置过滤器
        //scan.setFilter();
        ResultScanner scanner = table.getScanner(scan);
        Iterator<Result> it = scanner.iterator();
        while (it.hasNext()) {
            Result result = it.next();
            CellScanner cellScanner = result.cellScanner();
            while (cellScanner.advance()) {
                Cell cell = cellScanner.current();
                //printCell(cell);
                printCellUtil(cell);

            }
        }
    }

    /**
     * 根据RowKey删除列
     *
     * @throws Exception
     */
    @Test
    public void delete() throws Exception {
        Table table = connection.getTable(tableName);
        Delete delete = new Delete("r1".getBytes());
        delete.addColumn("extra_info".getBytes(), "address".getBytes());
        table.delete(delete);
    }


    /**
     * 根据RowKey范围获取数据,并根据Filter过滤
     *
     * @throws Exception
     */
    @Test
    public void scanWithFilter() throws Exception {
        Table table = connection.getTable(tableName);
        Scan scan = new Scan();
        //设置起始rowkey
        scan.withStartRow("r1".getBytes());
        //设置结束rowkey
        scan.withStopRow("r3".getBytes());
        //构建一个行键过滤器
        Filter filter = null;
        //CompareOperator:比较符，比较器：ByteArrayComparable实现类
        //filter = new RowFilter(CompareOperator.EQUAL, new SubstringComparator("002"));
        //filter = new RowFilter(CompareOperator.EQUAL,new BinaryPrefixComparator("r222".getBytes()));
        //filter = new PrefixFilter("r111".getBytes());
        //filter = new FamilyFilter(CompareOperator.EQUAL,new BinaryPrefixComparator("base".getBytes()));
        //filter = new ValueFilter(CompareOperator.EQUAL,new BinaryPrefixComparator("1".getBytes()));
        filter = new ValueFilter(CompareOperator.EQUAL,new SubstringComparator("lisi19"));
        //设置过滤器
        scan.setFilter(filter);
        ResultScanner scanner = table.getScanner(scan);
        Iterator<Result> it = scanner.iterator();
        while (it.hasNext()) {
            Result result = it.next();
            CellScanner cellScanner = result.cellScanner();
            while (cellScanner.advance()) {
                Cell cell = cellScanner.current();
                //printCell(cell);
                printCellUtil(cell);

            }
        }
    }


    @After
    public void close() throws Exception {
        admin.close();
        connection.close();
    }


}
